{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Stream custom generator functions\n",
    "\n",
    "You can use generator functions (ie. functions that use the `yield` keyword, and behave like iterators) in a LCEL pipeline.\n",
    "\n",
    "The signature of these generators should be `Iterator[Input] -> Iterator[Output]`. Or for async generators: `AsyncIterator[Input] -> AsyncIterator[Output]`.\n",
    "\n",
    "These are useful for:\n",
    "- implementing a custom output parser\n",
    "- modifying the output of a previous step, while preserving streaming capabilities\n",
    "\n",
    "Let's implement a custom output parser for comma-separated lists."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Sync version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install --upgrade --quiet  langchain langchain-openai"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Iterator, List\n",
    "\n",
    "from langchain.prompts.chat import ChatPromptTemplate\n",
    "from langchain_core.output_parsers import StrOutputParser\n",
    "from langchain_openai import ChatOpenAI\n",
    "\n",
    "prompt = ChatPromptTemplate.from_template(\n",
    "    \"Write a comma-separated list of 5 animals similar to: {animal}\"\n",
    ")\n",
    "model = ChatOpenAI(temperature=0.0)\n",
    "\n",
    "str_chain = prompt | model | StrOutputParser()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "lion, tiger, wolf, gorilla, panda"
     ]
    }
   ],
   "source": [
    "for chunk in str_chain.stream({\"animal\": \"bear\"}):\n",
    "    print(chunk, end=\"\", flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'lion, tiger, wolf, gorilla, panda'"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "str_chain.invoke({\"animal\": \"bear\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This is a custom parser that splits an iterator of llm tokens\n",
    "# into a list of strings separated by commas\n",
    "def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:\n",
    "    # hold partial input until we get a comma\n",
    "    buffer = \"\"\n",
    "    for chunk in input:\n",
    "        # add current chunk to buffer\n",
    "        buffer += chunk\n",
    "        # while there are commas in the buffer\n",
    "        while \",\" in buffer:\n",
    "            # split buffer on comma\n",
    "            comma_index = buffer.index(\",\")\n",
    "            # yield everything before the comma\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            # save the rest for the next iteration\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    # yield the last chunk\n",
    "    yield [buffer.strip()]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "list_chain = str_chain | split_into_list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['lion']\n",
      "['tiger']\n",
      "['wolf']\n",
      "['gorilla']\n",
      "['panda']\n"
     ]
    }
   ],
   "source": [
    "for chunk in list_chain.stream({\"animal\": \"bear\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['lion', 'tiger', 'wolf', 'gorilla', 'panda']"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "list_chain.invoke({\"animal\": \"bear\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Async version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import AsyncIterator\n",
    "\n",
    "\n",
    "async def asplit_into_list(\n",
    "    input: AsyncIterator[str],\n",
    ") -> AsyncIterator[List[str]]:  # async def\n",
    "    buffer = \"\"\n",
    "    async for (\n",
    "        chunk\n",
    "    ) in input:  # `input` is a `async_generator` object, so use `async for`\n",
    "        buffer += chunk\n",
    "        while \",\" in buffer:\n",
    "            comma_index = buffer.index(\",\")\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    yield [buffer.strip()]\n",
    "\n",
    "\n",
    "list_chain = str_chain | asplit_into_list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['lion']\n",
      "['tiger']\n",
      "['wolf']\n",
      "['gorilla']\n",
      "['panda']\n"
     ]
    }
   ],
   "source": [
    "async for chunk in list_chain.astream({\"animal\": \"bear\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['lion', 'tiger', 'wolf', 'gorilla', 'panda']"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "await list_chain.ainvoke({\"animal\": \"bear\"})"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
